package com.gbase8c.dmt.migration.job;

import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.Engine;
import com.alibaba.datax.core.util.ConfigParser;
import com.gbase8c.dmt.common.util.Exceptions;
import com.gbase8c.dmt.common.util.JsonMapper;
import com.gbase8c.dmt.migration.MoLogger;
import com.gbase8c.dmt.model.enums.MigrationObjectType;
import com.gbase8c.dmt.model.migration.dto.MigrationObject;
import com.gbase8c.dmt.model.migration.record.JobThreadRecord;
import com.gbase8c.dmt.model.enums.Status;
import com.gbase8c.dmt.migration.MigrationObjectService;
import com.gbase8c.dmt.model.migration.record.Mo;
import lombok.extern.slf4j.Slf4j;

import javax.sql.DataSource;
import java.util.List;
import java.util.Map;

@Slf4j
public class DataXJobThread extends AbstractJobThread {

    public DataXJobThread(MigrationObjectType type, MigrationObject dbObject, DataSource tarDataSource, MoLogger moLogger) {
        super(type, dbObject, tarDataSource, moLogger);
    }

    @Override
    public JobThreadRecord call() throws Exception {
        Mo mo = Mo.builder()
                .name(dbObject.getSchema() + "." + dbObject.getName())
                .status(Status.InProgress.name())
                .build();
        moLogger.log(mo);
        JobThreadRecord jobThreadRecord = new JobThreadRecord();
        jobThreadRecord.setFlag(true);
        List<String> contents = dbObject.getContents();
        jobThreadRecord.setSchemaName(dbObject.getSchema());
        try {
            dbObject.setStatus(Status.InProgress);
            String dataXConfig = contents.get(0);
            Configuration configuration = ConfigParser.parseJson(dataXConfig);
            long jobId = MigrationObjectService.toLong(dbObject.getSchema()+"."+dbObject.getName());
            configuration.set("core.container.job.id", jobId);
            Map<String, Object> dataxMap = JsonMapper.nonEmptyMapper().fromJson(dataXConfig, Map.class);
            String jobInfo = JsonMapper.nonEmptyMapper().getMapper().writerWithDefaultPrettyPrinter().writeValueAsString(dataxMap);
            log.info(String.format("start datax job[%s]: ", jobId) + "\n" + jobInfo);
            Engine engine = new Engine();
            engine.start(configuration);
            dbObject.setStatus(Status.Finished);
            mo.setName(Status.Finished.name());
        } catch (Exception e) {
            log.error("datax error!", e);
            dbObject.setStatus(Status.Failure);
            dbObject.setMsg(e.getMessage());
            jobThreadRecord.setErrorLog(Exceptions.getStackTraceAsString(e));
            jobThreadRecord.setFlag(false);
            mo.setStatus(Status.Failure.name());
            mo.setMsg(e.getMessage());
        }
        jobThreadRecord.setDbObject(dbObject);
        return jobThreadRecord;
    }
}
